﻿using RabbitMQ.Gateway.Admin;
using RabbitMQ.Gateway.Config;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Threading.Tasks;
using System.Reflection;
using System.Web.Configuration;

namespace RabbitMQ.Gateway
{
    /// <summary>
    /// 
    /// </summary>
    /// <modifier>cy.z</modifier>
	public class GatewayFactory : IGatewayFactory, System.IDisposable
	{
        protected System.Collections.Generic.Dictionary<string, IConnection> _connections;
       
		protected Configuration config;
		protected bool disposed;
        /// <summary>
        /// 是否web站点
        /// </summary>
        private string _iswebsite;
        /// <summary>
        /// 是否应用队列
        /// </summary>
        private static readonly string _isenablemq = System.Configuration.ConfigurationManager.AppSettings["MQEnable"] ?? string.Empty;

		public GatewayFactory()
        {
            if (_isenablemq.Equals("false"))
            {
                return;
            }

            this.disposed = false;
            _iswebsite = System.Configuration.ConfigurationManager.AppSettings["IsWebSite"] ?? string.Empty;
            if (_iswebsite.Equals("true", StringComparison.CurrentCultureIgnoreCase))
            {
                this.config = WebConfigurationManager.OpenWebConfiguration("~");
            }
            else
            {
                this.config = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
            }
            this.Initialize();
        }
		~GatewayFactory()
		{
            if (_isenablemq.Equals("false"))
            {
                return;
            }
            this.Dispose(false);
		}
		public GatewayFactory(Configuration config)
		{
			this.disposed = false;
			this.config = config;
			this.Initialize();
		}

        private string IsInitMQ = System.Configuration.ConfigurationManager.AppSettings["IsInitRabbitMQ"] ?? "true";

        private void Initialize()
		{
            this._connections = new System.Collections.Generic.Dictionary<string, IConnection>();
            this.InitializeConnections();
            if (IsInitMQ.Equals("true"))
            {
                RabbitAdmin.InitializeExchanges(this._connections);
            }
        }

        public void InitMQInfo()
        {
            this._connections = new System.Collections.Generic.Dictionary<string, IConnection>();
            this.InitializeConnections();
            RabbitAdmin.InitializeExchanges(this._connections);
        }

        public void InitializeConnections()
		{
			ConnectionSection objects = this.config.GetSection("AMQPConnection/ConnectionSettings") as ConnectionSection;
			if (objects == null)
			{
				throw new ConfigurationErrorsException("The AMQPConnection/ConnectionSettings configuration section is missing from the configuration file");
			}
			if (objects.ConnectionList.Count == 0)
			{
				throw new ConfigurationErrorsException("The AMQPConnection/ConnectionSettings requires a Connection element to communicate with the RabbitMQ server");
			}
			Parallel.For(0, objects.ConnectionList.Count, delegate(int i)
			{
				RabbitMQ.Gateway.Config.ConnectionFactory connectionFactory = objects.ConnectionList[i];
				RabbitMQ.Client.ConnectionFactory connectionFactory2 = new RabbitMQ.Client.ConnectionFactory();
				connectionFactory2.UserName = connectionFactory.UserName;
				connectionFactory2.Password = connectionFactory.Password;
				connectionFactory2.HostName = connectionFactory.Host;
				connectionFactory2.Port = connectionFactory.Port;
				connectionFactory2.VirtualHost = connectionFactory.VHost;
				lock (this._connections)
				{
                    try
                    {
                        this._connections.Add(connectionFactory.Name, connectionFactory2.CreateConnection());
                    }
                    catch (Exception ex)
                    {
                        RabbitMQ.Gateway.Log.loger.Error(ex);
                    }
				}
			});
		}

        private ConnectionSection GetConfigSection()
		{
			ConnectionSection connectionSection = this.config.GetSection("AMQPConnection/ConnectionSettings") as ConnectionSection;
			if (connectionSection == null)
			{
				throw new ConfigurationErrorsException("The AMQPConnection/ConnectionSettings configuration section is missing from the configuration file");
			}
			return connectionSection;
		}

		public IPublisher GetPublisher(string PublisherName, ConvertToMessage converter,bool ReConnection = false)
		{
			ConnectionSection configSection = this.GetConfigSection();
			RabbitMQ.Gateway.Config.Publisher publisher = configSection.PublisherList[PublisherName];
			if (publisher == null)
			{
				throw new ConfigurationErrorsException("The AMQPConnection/ConnectionSettings Publisher section is missing from the configuration file");
			}
            IConnection conncetion = (ReConnection ? ReConnections(publisher.Connection) : this.GetConnection(publisher.Connection));
            return new Publisher(conncetion, converter, publisher.Exchange)
			{
				Immediate = publisher.Immediate,
				Mandatory = publisher.Mandatory
			};
		}

        public List<IPublisher> GetPublisherList(string PublisherName, ConvertToMessage converter, bool ReConnection = false)
        {
            List<IPublisher> pusers = new List<IPublisher>();
            ConnectionSection configSection = this.GetConfigSection();

            RabbitMQ.Gateway.Config.Publisher publisher = null;
            if (!string.IsNullOrWhiteSpace(PublisherName))
            {
                publisher = configSection.PublisherList[PublisherName];
            }
            else
            {
                publisher = configSection.PublisherList[0];
            }
            
            if (publisher == null)
            {
                throw new ConfigurationErrorsException("The AMQPConnection/ConnectionSettings Publisher section is missing from the configuration file");
            }
            IConnection conncetion;
            string[] cons = publisher.Connection.Split(',');
            for (int i = 0; i < cons.Length; i++) {
                conncetion = (ReConnection ? ReConnections(cons[i]) : this.GetConnection(cons[i]));

                if (conncetion == null) continue;

                pusers.Add(new Publisher(conncetion, converter, publisher.Exchange, publisher.RouteKey)
                {
                    Immediate = publisher.Immediate,
                    Mandatory = publisher.Mandatory,
                });
            }
            return pusers;
        }

        public IAsyncConsumerProcessor GetAsyncReceiver(string ReceiverName, MessageConsumer handler,bool OneByOne=true)
		{
			ConnectionSection configSection = this.GetConfigSection();
			AsyncReceiver asyncReceiver = configSection.AsyncReceiverList[ReceiverName];
			if (asyncReceiver == null)
			{
				throw new ConfigurationErrorsException(string.Format("The AMQPConnection/ConnectionSettings AsyncReciever {0} is missing from the configuration file", ReceiverName));
			}
			return new AsyncConsumerProcessor(this.GetConnection(asyncReceiver.Connection), asyncReceiver.MaxThreads, handler, asyncReceiver.Queue, OneByOne);
		}

        /// <summary>
        /// 
        /// </summary>
        /// <returns>key-Name,value - logername</returns>
        public Dictionary<string,string> GetAsyncReceiverList()
        {
            Configuration cf;
            string _iswebsite = System.Configuration.ConfigurationManager.AppSettings["IsWebSite"] ?? string.Empty;
            if (_iswebsite.Equals("true", StringComparison.CurrentCultureIgnoreCase))
            {
                cf = WebConfigurationManager.OpenWebConfiguration("~");
            }
            else
            {
                cf = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
            }
            ConnectionSection connectionSection = cf.GetSection("AMQPConnection/ConnectionSettings") as ConnectionSection;
            if (connectionSection == null)
            {
                throw new ConfigurationErrorsException("The AMQPConnection/ConnectionSettings configuration section is missing from the configuration file");
            }
            if (GetConfigSection().AsyncReceiverList.Count != connectionSection.AsyncReceiverList.Count)
            {
                this.config = cf;
            }
            Dictionary<string, string> names = new Dictionary<string, string>();
            for (int i = 0; i < connectionSection.AsyncReceiverList.Count; i++)
            {
                names.Add(connectionSection.AsyncReceiverList[i].Name, connectionSection.AsyncReceiverList[i].LogerName);
            }

            return names;
        }
        
        /// <summary>
        /// default log
        /// </summary>
        private static readonly log4net.ILog log = log4net.LogManager.GetLogger("root");

        public IAsyncConsumerProcessor GetAsyncReceiver(string ReceiverName, log4net.ILog loger,Func<object,string,string,bool> ErrorMethod, bool OneByOne = true,bool ReConnection = false)
        {
            ConnectionSection configSection = this.GetConfigSection();
            AsyncReceiver asyncReceiver = configSection.AsyncReceiverList[ReceiverName];
            MessageConsumer handler;
           
            try
            {
                //通过反射注入消费者组件
                Type type = Assembly.Load(asyncReceiver.Assembly).GetType(asyncReceiver.Class);
                IMessageConsumer consumer = (IMessageConsumer)Activator.CreateInstance(type);
                handler = consumer.ConsumeMessage;
            }
            catch (Exception ex)
            {
                RabbitMQ.Gateway.Log.loger.Error(ex);
                throw ex;
            }
            if (asyncReceiver == null)
            {
                throw new ConfigurationErrorsException(string.Format("The AMQPConnection/ConnectionSettings AsyncReciever {0} is missing from the configuration file", ReceiverName));
            }
            IConnection conncetion = (ReConnection ? ReConnections(asyncReceiver.Connection) : this.GetConnection(asyncReceiver.Connection));
            return new AsyncConsumerProcessor(conncetion, asyncReceiver.MaxThreads, handler,ErrorMethod, asyncReceiver.Queue, loger, OneByOne);
        }

        private IConnection GetConnection(string name)
		{
            if (!this._connections.ContainsKey(name)) return null;

			IConnection connection = this._connections[name];
			if (connection == null)
			{
				throw new ConfigurationErrorsException(string.Format("A connection with the name {0} does not exist in the configuration file", name));
			}
			return connection;
		}

        /// <summary>
        ///  16.3.9+
        /// </summary>
        /// <param name="name"></param>
        /// <returns></returns>
        private IConnection ReConnections(string name)
        {
            RabbitMQ.Gateway.Log.loger.Info("********* 尝试重新连接【开始】 ***********");
            ConnectionSection objects = this.config.GetSection("AMQPConnection/ConnectionSettings") as ConnectionSection;
            IConnection connection = null;
            Parallel.For(0, objects.ConnectionList.Count, (i, ParallelLoopState) =>
            {
                RabbitMQ.Gateway.Config.ConnectionFactory connectionFactory = objects.ConnectionList[i];
                if (connectionFactory.Name == name)
                {
                    RabbitMQ.Client.ConnectionFactory connectionFactory2 = new RabbitMQ.Client.ConnectionFactory();
                    connectionFactory2.UserName = connectionFactory.UserName;
                    connectionFactory2.Password = connectionFactory.Password;
                    connectionFactory2.HostName = connectionFactory.Host;
                    connectionFactory2.Port = connectionFactory.Port;
                    connectionFactory2.VirtualHost = connectionFactory.VHost;
                    lock (this._connections)
                    {
                        if (this._connections == null)
                        {
                            this._connections = new System.Collections.Generic.Dictionary<string, IConnection>();
                        }
                        else
                        {
                            if (this._connections.ContainsKey(name))
                            {
                                this._connections.Remove(name);
                            }
                        }
                        try
                        {
                            connection = connectionFactory2.CreateConnection();
                            this._connections.Add(connectionFactory.Name, connection);
                            RabbitMQ.Gateway.Log.loger.Info("********* 尝试重新连接【成功】 ***********");
                            ParallelLoopState.Stop();
                            return;
                        }
                        catch (Exception ex)
                        {
                            RabbitMQ.Gateway.Log.loger.Info("********* 尝试重新连接【异常】 ***********");
                            RabbitMQ.Gateway.Log.loger.Error(ex);
                            ParallelLoopState.Stop();
                            return;
                        }
                    }
                }
            });
            return connection;
        }




        public void Dispose()
		{
			this.Dispose(true);
		}
		protected virtual void Dispose(bool disposing)
		{
			try
			{
				if (!disposing && !this.disposed)
				{
                    foreach (IConnection con in this._connections.Values)
                    {
                        con.Close(1, "object destructor called");
                    }
				}
				if (disposing && !this.disposed)
				{
                    foreach (IConnection con in this._connections.Values)
                    {
                        con.Close(1, "object destructor called");
                    }
					this.disposed = true;
				}
			}
			catch (AlreadyClosedException ex)
			{
                log.Error(ex);
			}
			this.config = null;
			this._connections = null;
			if (disposing)
			{
				System.GC.SuppressFinalize(this);
			}
		}
	}
}
